IntelliJ IDEA上搭建Flink开发环境 | 您所在的位置:网站首页 › scala 写文件 › IntelliJ IDEA上搭建Flink开发环境 |
通过IntelliJ IDEA搭建Flink开发环境,首先要安装Flink和Scala,具体操作请参照:
Flink安装:https://blog.csdn.net/x976269167/article/details/105700963 Scala安装:https://blog.csdn.net/x976269167/article/details/105740307 1、创建一个maven工程https://plugins.jetbrains.com/plugin/1347-scala
![]() ![]()
(1)在scala目录下新建一个scala,代码如下, import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object WordCountScala { def main(args: Array[String]): Unit = { //生成了配置对象 val config = new Configuration() //打开flink-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString("web.log.path", "D:\\Java\\Logs\\Flink\\log.file") //配置taskManager的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\Java\\Logs\\Flink\\log.file") //获得local运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //定义socket的source源 val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666) //scala开发需要加一行隐式转换,否则在调用operator的时候会报错,作用是找到scala类型的TypeInformation import org.apache.flink.api.scala._ //定义operators,作用是解析数据,分组,并且求wordCount val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1) //定义sink,打印数据到控制台 wordCount.print() //定义任务的名称并运行 //注意:operator是惰性的,只有遇到execute才执行 env.execute(jobName = "SocketWordCount") } }(2) 打开cmd,输入如下命令, nc -l -p 6666 (3) 右键刚才scala下添加的方法,点击运行main方法 (4)在cmd中随便输入数字,可以看到控制台在计算了,验证成功
|
CopyRight 2018-2019 实验室设备网 版权所有 |